home *** CD-ROM | disk | FTP | other *** search
/ Chip 2007 January, February, March & April / Chip-Cover-CD-2007-02.iso / Pakiet bezpieczenstwa / mini Pentoo LiveCD 2006.1 / mpentoo-2006.1.iso / livecd.squashfs / usr / lib / python2.4 / test / test_socket.py < prev    next >
Text File  |  2005-10-18  |  27KB  |  839 lines

  1. #!/usr/bin/env python
  2.  
  3. import unittest
  4. from test import test_support
  5.  
  6. import socket
  7. import select
  8. import time
  9. import thread, threading
  10. import Queue
  11. import sys
  12. from weakref import proxy
  13.  
  14. PORT = 50007
  15. HOST = 'localhost'
  16. MSG = 'Michael Gilfix was here\n'
  17.  
  18. class SocketTCPTest(unittest.TestCase):
  19.  
  20.     def setUp(self):
  21.         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  22.         self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  23.         self.serv.bind((HOST, PORT))
  24.         self.serv.listen(1)
  25.  
  26.     def tearDown(self):
  27.         self.serv.close()
  28.         self.serv = None
  29.  
  30. class SocketUDPTest(unittest.TestCase):
  31.  
  32.     def setUp(self):
  33.         self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  34.         self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  35.         self.serv.bind((HOST, PORT))
  36.  
  37.     def tearDown(self):
  38.         self.serv.close()
  39.         self.serv = None
  40.  
  41. class ThreadableTest:
  42.     """Threadable Test class
  43.  
  44.     The ThreadableTest class makes it easy to create a threaded
  45.     client/server pair from an existing unit test. To create a
  46.     new threaded class from an existing unit test, use multiple
  47.     inheritance:
  48.  
  49.         class NewClass (OldClass, ThreadableTest):
  50.             pass
  51.  
  52.     This class defines two new fixture functions with obvious
  53.     purposes for overriding:
  54.  
  55.         clientSetUp ()
  56.         clientTearDown ()
  57.  
  58.     Any new test functions within the class must then define
  59.     tests in pairs, where the test name is preceeded with a
  60.     '_' to indicate the client portion of the test. Ex:
  61.  
  62.         def testFoo(self):
  63.             # Server portion
  64.  
  65.         def _testFoo(self):
  66.             # Client portion
  67.  
  68.     Any exceptions raised by the clients during their tests
  69.     are caught and transferred to the main thread to alert
  70.     the testing framework.
  71.  
  72.     Note, the server setup function cannot call any blocking
  73.     functions that rely on the client thread during setup,
  74.     unless serverExplicityReady() is called just before
  75.     the blocking call (such as in setting up a client/server
  76.     connection and performing the accept() in setUp().
  77.     """
  78.  
  79.     def __init__(self):
  80.         # Swap the true setup function
  81.         self.__setUp = self.setUp
  82.         self.__tearDown = self.tearDown
  83.         self.setUp = self._setUp
  84.         self.tearDown = self._tearDown
  85.  
  86.     def serverExplicitReady(self):
  87.         """This method allows the server to explicitly indicate that
  88.         it wants the client thread to proceed. This is useful if the
  89.         server is about to execute a blocking routine that is
  90.         dependent upon the client thread during its setup routine."""
  91.         self.server_ready.set()
  92.  
  93.     def _setUp(self):
  94.         self.server_ready = threading.Event()
  95.         self.client_ready = threading.Event()
  96.         self.done = threading.Event()
  97.         self.queue = Queue.Queue(1)
  98.  
  99.         # Do some munging to start the client test.
  100.         methodname = self.id()
  101.         i = methodname.rfind('.')
  102.         methodname = methodname[i+1:]
  103.         test_method = getattr(self, '_' + methodname)
  104.         self.client_thread = thread.start_new_thread(
  105.             self.clientRun, (test_method,))
  106.  
  107.         self.__setUp()
  108.         if not self.server_ready.isSet():
  109.             self.server_ready.set()
  110.         self.client_ready.wait()
  111.  
  112.     def _tearDown(self):
  113.         self.__tearDown()
  114.         self.done.wait()
  115.  
  116.         if not self.queue.empty():
  117.             msg = self.queue.get()
  118.             self.fail(msg)
  119.  
  120.     def clientRun(self, test_func):
  121.         self.server_ready.wait()
  122.         self.client_ready.set()
  123.         self.clientSetUp()
  124.         if not callable(test_func):
  125.             raise TypeError, "test_func must be a callable function"
  126.         try:
  127.             test_func()
  128.         except Exception, strerror:
  129.             self.queue.put(strerror)
  130.         self.clientTearDown()
  131.  
  132.     def clientSetUp(self):
  133.         raise NotImplementedError, "clientSetUp must be implemented."
  134.  
  135.     def clientTearDown(self):
  136.         self.done.set()
  137.         thread.exit()
  138.  
  139. class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
  140.  
  141.     def __init__(self, methodName='runTest'):
  142.         SocketTCPTest.__init__(self, methodName=methodName)
  143.         ThreadableTest.__init__(self)
  144.  
  145.     def clientSetUp(self):
  146.         self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  147.  
  148.     def clientTearDown(self):
  149.         self.cli.close()
  150.         self.cli = None
  151.         ThreadableTest.clientTearDown(self)
  152.  
  153. class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
  154.  
  155.     def __init__(self, methodName='runTest'):
  156.         SocketUDPTest.__init__(self, methodName=methodName)
  157.         ThreadableTest.__init__(self)
  158.  
  159.     def clientSetUp(self):
  160.         self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  161.  
  162. class SocketConnectedTest(ThreadedTCPSocketTest):
  163.  
  164.     def __init__(self, methodName='runTest'):
  165.         ThreadedTCPSocketTest.__init__(self, methodName=methodName)
  166.  
  167.     def setUp(self):
  168.         ThreadedTCPSocketTest.setUp(self)
  169.         # Indicate explicitly we're ready for the client thread to
  170.         # proceed and then perform the blocking call to accept
  171.         self.serverExplicitReady()
  172.         conn, addr = self.serv.accept()
  173.         self.cli_conn = conn
  174.  
  175.     def tearDown(self):
  176.         self.cli_conn.close()
  177.         self.cli_conn = None
  178.         ThreadedTCPSocketTest.tearDown(self)
  179.  
  180.     def clientSetUp(self):
  181.         ThreadedTCPSocketTest.clientSetUp(self)
  182.         self.cli.connect((HOST, PORT))
  183.         self.serv_conn = self.cli
  184.  
  185.     def clientTearDown(self):
  186.         self.serv_conn.close()
  187.         self.serv_conn = None
  188.         ThreadedTCPSocketTest.clientTearDown(self)
  189.  
  190. class SocketPairTest(unittest.TestCase, ThreadableTest):
  191.  
  192.     def __init__(self, methodName='runTest'):
  193.         unittest.TestCase.__init__(self, methodName=methodName)
  194.         ThreadableTest.__init__(self)
  195.  
  196.     def setUp(self):
  197.         self.serv, self.cli = socket.socketpair()
  198.  
  199.     def tearDown(self):
  200.         self.serv.close()
  201.         self.serv = None
  202.  
  203.     def clientSetUp(self):
  204.         pass
  205.  
  206.     def clientTearDown(self):
  207.         self.cli.close()
  208.         self.cli = None
  209.         ThreadableTest.clientTearDown(self)
  210.  
  211.  
  212. #######################################################################
  213. ## Begin Tests
  214.  
  215. class GeneralModuleTests(unittest.TestCase):
  216.  
  217.     def test_weakref(self):
  218.         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  219.         p = proxy(s)
  220.         self.assertEqual(p.fileno(), s.fileno())
  221.         s.close()
  222.         s = None
  223.         try:
  224.             p.fileno()
  225.         except ReferenceError:
  226.             pass
  227.         else:
  228.             self.fail('Socket proxy still exists')
  229.  
  230.     def testSocketError(self):
  231.         # Testing socket module exceptions
  232.         def raise_error(*args, **kwargs):
  233.             raise socket.error
  234.         def raise_herror(*args, **kwargs):
  235.             raise socket.herror
  236.         def raise_gaierror(*args, **kwargs):
  237.             raise socket.gaierror
  238.         self.failUnlessRaises(socket.error, raise_error,
  239.                               "Error raising socket exception.")
  240.         self.failUnlessRaises(socket.error, raise_herror,
  241.                               "Error raising socket exception.")
  242.         self.failUnlessRaises(socket.error, raise_gaierror,
  243.                               "Error raising socket exception.")
  244.  
  245.     def testCrucialConstants(self):
  246.         # Testing for mission critical constants
  247.         socket.AF_INET
  248.         socket.SOCK_STREAM
  249.         socket.SOCK_DGRAM
  250.         socket.SOCK_RAW
  251.         socket.SOCK_RDM
  252.         socket.SOCK_SEQPACKET
  253.         socket.SOL_SOCKET
  254.         socket.SO_REUSEADDR
  255.  
  256.     def testHostnameRes(self):
  257.         # Testing hostname resolution mechanisms
  258.         hostname = socket.gethostname()
  259.         try:
  260.             ip = socket.gethostbyname(hostname)
  261.         except socket.error:
  262.             # Probably name lookup wasn't set up right; skip this test
  263.             return
  264.         self.assert_(ip.find('.') >= 0, "Error resolving host to ip.")
  265.         try:
  266.             hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
  267.         except socket.error:
  268.             # Probably a similar problem as above; skip this test
  269.             return
  270.         all_host_names = [hostname, hname] + aliases
  271.         fqhn = socket.getfqdn()
  272.         if not fqhn in all_host_names:
  273.             self.fail("Error testing host resolution mechanisms.")
  274.  
  275.     def testRefCountGetNameInfo(self):
  276.         # Testing reference count for getnameinfo
  277.         import sys
  278.         if hasattr(sys, "getrefcount"):
  279.             try:
  280.                 # On some versions, this loses a reference
  281.                 orig = sys.getrefcount(__name__)
  282.                 socket.getnameinfo(__name__,0)
  283.             except SystemError:
  284.                 if sys.getrefcount(__name__) <> orig:
  285.                     self.fail("socket.getnameinfo loses a reference")
  286.  
  287.     def testInterpreterCrash(self):
  288.         # Making sure getnameinfo doesn't crash the interpreter
  289.         try:
  290.             # On some versions, this crashes the interpreter.
  291.             socket.getnameinfo(('x', 0, 0, 0), 0)
  292.         except socket.error:
  293.             pass
  294.  
  295.     def testNtoH(self):
  296.         # This just checks that htons etc. are their own inverse,
  297.         # when looking at the lower 16 or 32 bits.
  298.         sizes = {socket.htonl: 32, socket.ntohl: 32,
  299.                  socket.htons: 16, socket.ntohs: 16}
  300.         for func, size in sizes.items():
  301.             mask = (1L<<size) - 1
  302.             for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
  303.                 self.assertEqual(i & mask, func(func(i&mask)) & mask)
  304.  
  305.             swapped = func(mask)
  306.             self.assertEqual(swapped & mask, mask)
  307.             self.assertRaises(OverflowError, func, 1L<<34)
  308.  
  309.     def testGetServBy(self):
  310.         eq = self.assertEqual
  311.         # Find one service that exists, then check all the related interfaces.
  312.         # I've ordered this by protocols that have both a tcp and udp
  313.         # protocol, at least for modern Linuxes.
  314.         if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
  315.                             'darwin'):
  316.             # avoid the 'echo' service on this platform, as there is an
  317.             # assumption breaking non-standard port/protocol entry
  318.             services = ('daytime', 'qotd', 'domain')
  319.         else:
  320.             services = ('echo', 'daytime', 'domain')
  321.         for service in services:
  322.             try:
  323.                 port = socket.getservbyname(service, 'tcp')
  324.                 break
  325.             except socket.error:
  326.                 pass
  327.         else:
  328.             raise socket.error
  329.         # Try same call with optional protocol omitted
  330.         port2 = socket.getservbyname(service)
  331.         eq(port, port2)
  332.         # Try udp, but don't barf it it doesn't exist
  333.         try:
  334.             udpport = socket.getservbyname(service, 'udp')
  335.         except socket.error:
  336.             udpport = None
  337.         else:
  338.             eq(udpport, port)
  339.         # Now make sure the lookup by port returns the same service name
  340.         eq(socket.getservbyport(port2), service)
  341.         eq(socket.getservbyport(port, 'tcp'), service)
  342.         if udpport is not None:
  343.             eq(socket.getservbyport(udpport, 'udp'), service)
  344.  
  345.     def testDefaultTimeout(self):
  346.         # Testing default timeout
  347.         # The default timeout should initially be None
  348.         self.assertEqual(socket.getdefaulttimeout(), None)
  349.         s = socket.socket()
  350.         self.assertEqual(s.gettimeout(), None)
  351.         s.close()
  352.  
  353.         # Set the default timeout to 10, and see if it propagates
  354.         socket.setdefaulttimeout(10)
  355.         self.assertEqual(socket.getdefaulttimeout(), 10)
  356.         s = socket.socket()
  357.         self.assertEqual(s.gettimeout(), 10)
  358.         s.close()
  359.  
  360.         # Reset the default timeout to None, and see if it propagates
  361.         socket.setdefaulttimeout(None)
  362.         self.assertEqual(socket.getdefaulttimeout(), None)
  363.         s = socket.socket()
  364.         self.assertEqual(s.gettimeout(), None)
  365.         s.close()
  366.  
  367.         # Check that setting it to an invalid value raises ValueError
  368.         self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
  369.  
  370.         # Check that setting it to an invalid type raises TypeError
  371.         self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
  372.  
  373.     def testIPv4toString(self):
  374.         if not hasattr(socket, 'inet_pton'):
  375.             return # No inet_pton() on this platform
  376.         from socket import inet_aton as f, inet_pton, AF_INET
  377.         g = lambda a: inet_pton(AF_INET, a)
  378.  
  379.         self.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
  380.         self.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
  381.         self.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
  382.         self.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
  383.         self.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
  384.  
  385.         self.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
  386.         self.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
  387.         self.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
  388.         self.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
  389.  
  390.     def testIPv6toString(self):
  391.         if not hasattr(socket, 'inet_pton'):
  392.             return # No inet_pton() on this platform
  393.         try:
  394.             from socket import inet_pton, AF_INET6, has_ipv6
  395.             if not has_ipv6:
  396.                 return
  397.         except ImportError:
  398.             return
  399.         f = lambda a: inet_pton(AF_INET6, a)
  400.  
  401.         self.assertEquals('\x00' * 16, f('::'))
  402.         self.assertEquals('\x00' * 16, f('0::0'))
  403.         self.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
  404.         self.assertEquals(
  405.             '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
  406.             f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
  407.         )
  408.  
  409.     def testStringToIPv4(self):
  410.         if not hasattr(socket, 'inet_ntop'):
  411.             return # No inet_ntop() on this platform
  412.         from socket import inet_ntoa as f, inet_ntop, AF_INET
  413.         g = lambda a: inet_ntop(AF_INET, a)
  414.  
  415.         self.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
  416.         self.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55'))
  417.         self.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
  418.         self.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
  419.  
  420.         self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
  421.         self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
  422.         self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
  423.  
  424.     def testStringToIPv6(self):
  425.         if not hasattr(socket, 'inet_ntop'):
  426.             return # No inet_ntop() on this platform
  427.         try:
  428.             from socket import inet_ntop, AF_INET6, has_ipv6
  429.             if not has_ipv6:
  430.                 return
  431.         except ImportError:
  432.             return
  433.         f = lambda a: inet_ntop(AF_INET6, a)
  434.  
  435.         self.assertEquals('::', f('\x00' * 16))
  436.         self.assertEquals('::1', f('\x00' * 15 + '\x01'))
  437.         self.assertEquals(
  438.             'aef:b01:506:1001:ffff:9997:55:170',
  439.             f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
  440.         )
  441.  
  442.     # XXX The following don't test module-level functionality...
  443.  
  444.     def testSockName(self):
  445.         # Testing getsockname()
  446.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  447.         sock.bind(("0.0.0.0", PORT+1))
  448.         name = sock.getsockname()
  449.         self.assertEqual(name, ("0.0.0.0", PORT+1))
  450.  
  451.     def testGetSockOpt(self):
  452.         # Testing getsockopt()
  453.         # We know a socket should start without reuse==0
  454.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  455.         reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
  456.         self.failIf(reuse != 0, "initial mode is reuse")
  457.  
  458.     def testSetSockOpt(self):
  459.         # Testing setsockopt()
  460.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  461.         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  462.         reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
  463.         self.failIf(reuse == 0, "failed to set reuse mode")
  464.  
  465.     def testSendAfterClose(self):
  466.         # testing send() after close() with timeout
  467.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  468.         sock.settimeout(1)
  469.         sock.close()
  470.         self.assertRaises(socket.error, sock.send, "spam")
  471.  
  472. class BasicTCPTest(SocketConnectedTest):
  473.  
  474.     def __init__(self, methodName='runTest'):
  475.         SocketConnectedTest.__init__(self, methodName=methodName)
  476.  
  477.     def testRecv(self):
  478.         # Testing large receive over TCP
  479.         msg = self.cli_conn.recv(1024)
  480.         self.assertEqual(msg, MSG)
  481.  
  482.     def _testRecv(self):
  483.         self.serv_conn.send(MSG)
  484.  
  485.     def testOverFlowRecv(self):
  486.         # Testing receive in chunks over TCP
  487.         seg1 = self.cli_conn.recv(len(MSG) - 3)
  488.         seg2 = self.cli_conn.recv(1024)
  489.         msg = seg1 + seg2
  490.         self.assertEqual(msg, MSG)
  491.  
  492.     def _testOverFlowRecv(self):
  493.         self.serv_conn.send(MSG)
  494.  
  495.     def testRecvFrom(self):
  496.         # Testing large recvfrom() over TCP
  497.         msg, addr = self.cli_conn.recvfrom(1024)
  498.         self.assertEqual(msg, MSG)
  499.  
  500.     def _testRecvFrom(self):
  501.         self.serv_conn.send(MSG)
  502.  
  503.     def testOverFlowRecvFrom(self):
  504.         # Testing recvfrom() in chunks over TCP
  505.         seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
  506.         seg2, addr = self.cli_conn.recvfrom(1024)
  507.         msg = seg1 + seg2
  508.         self.assertEqual(msg, MSG)
  509.  
  510.     def _testOverFlowRecvFrom(self):
  511.         self.serv_conn.send(MSG)
  512.  
  513.     def testSendAll(self):
  514.         # Testing sendall() with a 2048 byte string over TCP
  515.         msg = ''
  516.         while 1:
  517.             read = self.cli_conn.recv(1024)
  518.             if not read:
  519.                 break
  520.             msg += read
  521.         self.assertEqual(msg, 'f' * 2048)
  522.  
  523.     def _testSendAll(self):
  524.         big_chunk = 'f' * 2048
  525.         self.serv_conn.sendall(big_chunk)
  526.  
  527.     def testFromFd(self):
  528.         # Testing fromfd()
  529.         if not hasattr(socket, "fromfd"):
  530.             return # On Windows, this doesn't exist
  531.         fd = self.cli_conn.fileno()
  532.         sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
  533.         msg = sock.recv(1024)
  534.         self.assertEqual(msg, MSG)
  535.  
  536.     def _testFromFd(self):
  537.         self.serv_conn.send(MSG)
  538.  
  539.     def testShutdown(self):
  540.         # Testing shutdown()
  541.         msg = self.cli_conn.recv(1024)
  542.         self.assertEqual(msg, MSG)
  543.  
  544.     def _testShutdown(self):
  545.         self.serv_conn.send(MSG)
  546.         self.serv_conn.shutdown(2)
  547.  
  548. class BasicUDPTest(ThreadedUDPSocketTest):
  549.  
  550.     def __init__(self, methodName='runTest'):
  551.         ThreadedUDPSocketTest.__init__(self, methodName=methodName)
  552.  
  553.     def testSendtoAndRecv(self):
  554.         # Testing sendto() and Recv() over UDP
  555.         msg = self.serv.recv(len(MSG))
  556.         self.assertEqual(msg, MSG)
  557.  
  558.     def _testSendtoAndRecv(self):
  559.         self.cli.sendto(MSG, 0, (HOST, PORT))
  560.  
  561.     def testRecvFrom(self):
  562.         # Testing recvfrom() over UDP
  563.         msg, addr = self.serv.recvfrom(len(MSG))
  564.         self.assertEqual(msg, MSG)
  565.  
  566.     def _testRecvFrom(self):
  567.         self.cli.sendto(MSG, 0, (HOST, PORT))
  568.  
  569. class BasicSocketPairTest(SocketPairTest):
  570.  
  571.     def __init__(self, methodName='runTest'):
  572.         SocketPairTest.__init__(self, methodName=methodName)
  573.  
  574.     def testRecv(self):
  575.         msg = self.serv.recv(1024)
  576.         self.assertEqual(msg, MSG)
  577.  
  578.     def _testRecv(self):
  579.         self.cli.send(MSG)
  580.  
  581.     def testSend(self):
  582.         self.serv.send(MSG)
  583.  
  584.     def _testSend(self):
  585.         msg = self.cli.recv(1024)
  586.         self.assertEqual(msg, MSG)
  587.  
  588. class NonBlockingTCPTests(ThreadedTCPSocketTest):
  589.  
  590.     def __init__(self, methodName='runTest'):
  591.         ThreadedTCPSocketTest.__init__(self, methodName=methodName)
  592.  
  593.     def testSetBlocking(self):
  594.         # Testing whether set blocking works
  595.         self.serv.setblocking(0)
  596.         start = time.time()
  597.         try:
  598.             self.serv.accept()
  599.         except socket.error:
  600.             pass
  601.         end = time.time()
  602.         self.assert_((end - start) < 1.0, "Error setting non-blocking mode.")
  603.  
  604.     def _testSetBlocking(self):
  605.         pass
  606.  
  607.     def testAccept(self):
  608.         # Testing non-blocking accept
  609.         self.serv.setblocking(0)
  610.         try:
  611.             conn, addr = self.serv.accept()
  612.         except socket.error:
  613.             pass
  614.         else:
  615.             self.fail("Error trying to do non-blocking accept.")
  616.         read, write, err = select.select([self.serv], [], [])
  617.         if self.serv in read:
  618.             conn, addr = self.serv.accept()
  619.         else:
  620.             self.fail("Error trying to do accept after select.")
  621.  
  622.     def _testAccept(self):
  623.         time.sleep(0.1)
  624.         self.cli.connect((HOST, PORT))
  625.  
  626.     def testConnect(self):
  627.         # Testing non-blocking connect
  628.         conn, addr = self.serv.accept()
  629.  
  630.     def _testConnect(self):
  631.         self.cli.settimeout(10)
  632.         self.cli.connect((HOST, PORT))
  633.  
  634.     def testRecv(self):
  635.         # Testing non-blocking recv
  636.         conn, addr = self.serv.accept()
  637.         conn.setblocking(0)
  638.         try:
  639.             msg = conn.recv(len(MSG))
  640.         except socket.error:
  641.             pass
  642.         else:
  643.             self.fail("Error trying to do non-blocking recv.")
  644.         read, write, err = select.select([conn], [], [])
  645.         if conn in read:
  646.             msg = conn.recv(len(MSG))
  647.             self.assertEqual(msg, MSG)
  648.         else:
  649.             self.fail("Error during select call to non-blocking socket.")
  650.  
  651.     def _testRecv(self):
  652.         self.cli.connect((HOST, PORT))
  653.         time.sleep(0.1)
  654.         self.cli.send(MSG)
  655.  
  656. class FileObjectClassTestCase(SocketConnectedTest):
  657.  
  658.     bufsize = -1 # Use default buffer size
  659.  
  660.     def __init__(self, methodName='runTest'):
  661.         SocketConnectedTest.__init__(self, methodName=methodName)
  662.  
  663.     def setUp(self):
  664.         SocketConnectedTest.setUp(self)
  665.         self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
  666.  
  667.     def tearDown(self):
  668.         self.serv_file.close()
  669.         self.assert_(self.serv_file.closed)
  670.         self.serv_file = None
  671.         SocketConnectedTest.tearDown(self)
  672.  
  673.     def clientSetUp(self):
  674.         SocketConnectedTest.clientSetUp(self)
  675.         self.cli_file = self.serv_conn.makefile('wb')
  676.  
  677.     def clientTearDown(self):
  678.         self.cli_file.close()
  679.         self.assert_(self.cli_file.closed)
  680.         self.cli_file = None
  681.         SocketConnectedTest.clientTearDown(self)
  682.  
  683.     def testSmallRead(self):
  684.         # Performing small file read test
  685.         first_seg = self.serv_file.read(len(MSG)-3)
  686.         second_seg = self.serv_file.read(3)
  687.         msg = first_seg + second_seg
  688.         self.assertEqual(msg, MSG)
  689.  
  690.     def _testSmallRead(self):
  691.         self.cli_file.write(MSG)
  692.         self.cli_file.flush()
  693.  
  694.     def testFullRead(self):
  695.         # read until EOF
  696.         msg = self.serv_file.read()
  697.         self.assertEqual(msg, MSG)
  698.  
  699.     def _testFullRead(self):
  700.         self.cli_file.write(MSG)
  701.         self.cli_file.close()
  702.  
  703.     def testUnbufferedRead(self):
  704.         # Performing unbuffered file read test
  705.         buf = ''
  706.         while 1:
  707.             char = self.serv_file.read(1)
  708.             if not char:
  709.                 break
  710.             buf += char
  711.         self.assertEqual(buf, MSG)
  712.  
  713.     def _testUnbufferedRead(self):
  714.         self.cli_file.write(MSG)
  715.         self.cli_file.flush()
  716.  
  717.     def testReadline(self):
  718.         # Performing file readline test
  719.         line = self.serv_file.readline()
  720.         self.assertEqual(line, MSG)
  721.  
  722.     def _testReadline(self):
  723.         self.cli_file.write(MSG)
  724.         self.cli_file.flush()
  725.  
  726.     def testClosedAttr(self):
  727.         self.assert_(not self.serv_file.closed)
  728.  
  729.     def _testClosedAttr(self):
  730.         self.assert_(not self.cli_file.closed)
  731.  
  732. class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
  733.  
  734.     """Repeat the tests from FileObjectClassTestCase with bufsize==0.
  735.  
  736.     In this case (and in this case only), it should be possible to
  737.     create a file object, read a line from it, create another file
  738.     object, read another line from it, without loss of data in the
  739.     first file object's buffer.  Note that httplib relies on this
  740.     when reading multiple requests from the same socket."""
  741.  
  742.     bufsize = 0 # Use unbuffered mode
  743.  
  744.     def testUnbufferedReadline(self):
  745.         # Read a line, create a new file object, read another line with it
  746.         line = self.serv_file.readline() # first line
  747.         self.assertEqual(line, "A. " + MSG) # first line
  748.         self.serv_file = self.cli_conn.makefile('rb', 0)
  749.         line = self.serv_file.readline() # second line
  750.         self.assertEqual(line, "B. " + MSG) # second line
  751.  
  752.     def _testUnbufferedReadline(self):
  753.         self.cli_file.write("A. " + MSG)
  754.         self.cli_file.write("B. " + MSG)
  755.         self.cli_file.flush()
  756.  
  757. class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
  758.  
  759.     bufsize = 1 # Default-buffered for reading; line-buffered for writing
  760.  
  761.  
  762. class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
  763.  
  764.     bufsize = 2 # Exercise the buffering code
  765.  
  766. class TCPTimeoutTest(SocketTCPTest):
  767.  
  768.     def testTCPTimeout(self):
  769.         def raise_timeout(*args, **kwargs):
  770.             self.serv.settimeout(1.0)
  771.             self.serv.accept()
  772.         self.failUnlessRaises(socket.timeout, raise_timeout,
  773.                               "Error generating a timeout exception (TCP)")
  774.  
  775.     def testTimeoutZero(self):
  776.         ok = False
  777.         try:
  778.             self.serv.settimeout(0.0)
  779.             foo = self.serv.accept()
  780.         except socket.timeout:
  781.             self.fail("caught timeout instead of error (TCP)")
  782.         except socket.error:
  783.             ok = True
  784.         except:
  785.             self.fail("caught unexpected exception (TCP)")
  786.         if not ok:
  787.             self.fail("accept() returned success when we did not expect it")
  788.  
  789. class UDPTimeoutTest(SocketTCPTest):
  790.  
  791.     def testUDPTimeout(self):
  792.         def raise_timeout(*args, **kwargs):
  793.             self.serv.settimeout(1.0)
  794.             self.serv.recv(1024)
  795.         self.failUnlessRaises(socket.timeout, raise_timeout,
  796.                               "Error generating a timeout exception (UDP)")
  797.  
  798.     def testTimeoutZero(self):
  799.         ok = False
  800.         try:
  801.             self.serv.settimeout(0.0)
  802.             foo = self.serv.recv(1024)
  803.         except socket.timeout:
  804.             self.fail("caught timeout instead of error (UDP)")
  805.         except socket.error:
  806.             ok = True
  807.         except:
  808.             self.fail("caught unexpected exception (UDP)")
  809.         if not ok:
  810.             self.fail("recv() returned success when we did not expect it")
  811.  
  812. class TestExceptions(unittest.TestCase):
  813.  
  814.     def testExceptionTree(self):
  815.         self.assert_(issubclass(socket.error, Exception))
  816.         self.assert_(issubclass(socket.herror, socket.error))
  817.         self.assert_(issubclass(socket.gaierror, socket.error))
  818.         self.assert_(issubclass(socket.timeout, socket.error))
  819.  
  820.  
  821. def test_main():
  822.     tests = [GeneralModuleTests, BasicTCPTest, TCPTimeoutTest, TestExceptions]
  823.     if sys.platform != 'mac':
  824.         tests.extend([ BasicUDPTest, UDPTimeoutTest ])
  825.  
  826.     tests.extend([
  827.         NonBlockingTCPTests,
  828.         FileObjectClassTestCase,
  829.         UnbufferedFileObjectClassTestCase,
  830.         LineBufferedFileObjectClassTestCase,
  831.         SmallBufferedFileObjectClassTestCase
  832.     ])
  833.     if hasattr(socket, "socketpair"):
  834.         tests.append(BasicSocketPairTest)
  835.     test_support.run_unittest(*tests)
  836.  
  837. if __name__ == "__main__":
  838.     test_main()
  839.